package com.middleware.kafka;

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.junit.Test;

import java.io.IOException;
import java.time.Duration;
import java.util.*;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;

/**
 * @author: 马士兵教育
 * @create: 2020-12-13 20:14
 */
public class Lesson01 {


    /*
    创建TOPIC
    kafka-topics.sh --zookeeper node03:2181/kafka  --create --topic msb-items  --partitions
 2  --replication-factor 2




     */





    /*基础消费*/
    @Test
    public void consumer(){
        /*
        kafka-consumer-groups.sh --bootstrap-server node02:9092  --list
         */

        //基础配置
        Properties p = new Properties();
        p.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.0.111:9092,192.168.0.112:9092,192.168.0.113:9092");
        p.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        p.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

        //消费的细节
        p.setProperty(ConsumerConfig.GROUP_ID_CONFIG,"OOXX");
        //KAKFA IS MQ  IS STORAGE
        p.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");//第一次启动，米有offset
        /**
         *         "What to do when there is no initial offset in Kafka or if the current offset
         *         does not exist any more on the server
         *         (e.g. because that data has been deleted):
         *         <ul>
         *             <li>earliest: automatically reset the offset to the earliest offset
         *             <li>latest: automatically reset the offset to the latest offset</li>
         *             <li>none: throw exception to the consumer if no previous offset is found for the consumer's group</li><li>anything else: throw exception to the consumer.</li>
         *         </ul>";
         */
        p.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"true");//自动提交时异步提交，丢数据&&重复数据
        //一个运行的consumer ，那么自己会维护自己消费进度
        //一旦你自动提交，但是是异步的
        //1，还没到时间，挂了，没提交，重起一个consuemr，参照offset的时候，会重复消费
        //2，一个批次的数据还没写数据库成功，但是这个批次的offset背异步提交了，挂了，重起一个consuemr，参照offset的时候，会丢失消费

//        p.setProperty(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"15000");//自动提交是异步的，默认5秒
//        p.setProperty(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,""); // POLL 拉取数据，弹性，按需，拉取多少？




        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(p);
        //订阅Topic
        consumer.subscribe(Arrays.asList("msb-items"));

        while(true){
            /**
             * 常识：如果想多线程处理多分区
             * 每poll一次，用一个语义：一个job启动
             * 一次job用多线程并行处理分区
             * 且，job应该被控制是串行的
             * 以上的知识点，其实如果你学过大数据
             */
            //微批的感觉
            //拉取超时时间？
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(0));// 0~n

            if(!records.isEmpty()){
                //以下代码的优化很重要
                System.out.println("-----------"+records.count()+"-------------");




                Iterator<ConsumerRecord<String, String>> iter = records.iterator();
                while(iter.hasNext()){
                    //因为一个consuemr可以消费多个分区，但是一个分区只能给一个组里的一个consuemr消费
                    ConsumerRecord<String, String> record = iter.next();
                    int partition = record.partition();
                    long offset = record.offset();
                    String key = record.key();
                    String value = record.value();

                    System.out.println("key: "+ key+" value: "+ value+ " partition: "+partition + " offset: "+ offset);
                }
            }



        }


    }





}
